Skip to content

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Sep 22, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Tracking: locality backfill #23338
  • Previously we have supported index selection for backfilling. While index selection for backfilling only improve the locality of the leaves level of the DAG. To extend the idea of locality, we can introduce locality enforcement to the non-leaves nodes of the DAG. That is a new type of backfilling -- locality backfill. With locality backfill, we can provide good locality to the intermediate result during backfilling. In the circumstances of large historical data backfilling and memory-limited, it can significantly improve the performance.
  • In the optimizer, we will follow the try_better_locality method to enforce locality for an operator if it can't provide. The operator called LocalityProvider. Currently, we only generate this operator for LogicalJoin and LogicalScan. With this ability, users don't even need to create index by themselves. But if users know their workload well, they can create indexes to share indexes across different jobs.
  • The LocalityProvider has 2 states which one of the used to buffer data during backfilling and provide data locality. The other one is a progress table like normal backfill operator to track the backfilling progress of itself.
  • We use the locality columns as the prefix of the state table pk to provide the locality, so implicitly it will sort the table asynchronously by compactors.
  • Once we introduce locality backfill, the dependencies between scan backfill and intermediate locality backfill become important. We need to start the scan backfill first, then next to intermediate locality backfill. Among all intermediate locality backfill, we also need to take care of the backfilling dependency, since intermediate LocalityProvider operators could depend on each other. We extend these dependencies ordering control based on feat(meta,frontend,streaming): support fixed backfill order control #20967.
  • TODO: The state table could be truncated after backfilling.

Performance

TPCH Q18 with scale = 1g
We limit the compute node memory to 2g

Backfill throughput

With locality backfill, our backfilling could finish in 7min, while without the locality backfill, it jobs is too slow to finish.

image

Cache miss ratio

With locality backfill, our cache miss ratio is much lower than without it.

image
set enable_locality_backfill = true;
explain create materialized view q18 as    select
      c_name,
      c_custkey,
      o_orderkey,
      o_orderdate,
      o_totalprice,
      sum(l_quantity) quantity
    from
      customer,
      orders,
      lineitem
    where
      o_orderkey in (
        select
          l_orderkey
        from
          lineitem
        group by
          l_orderkey
        having
          sum(l_quantity) > 1
      )
      and c_custkey = o_custkey
      and o_orderkey = l_orderkey
    group by
      c_name,
      c_custkey,
      o_orderkey,
      o_orderdate,
      o_totalprice
    order by
      o_totalprice desc,
      o_orderdate
    LIMIT 100;

 StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], stream_key: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate], pk_columns: [o_totalprice, o_orderdate, c_custkey, c_name, o_orderkey], pk_conflict: NoCheck }
 └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] }
   └─StreamTopN { order: [orders.o_totalprice DESC, orders.o_orderdate ASC], limit: 100, offset: 0 }
     └─StreamExchange { dist: Single }
       └─StreamGroupTopN { order: [orders.o_totalprice DESC, orders.o_orderdate ASC], limit: 100, offset: 0, group_key: [$expr1] }
         └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] }
           └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate], aggs: [sum(lineitem.l_quantity), count] }
             └─StreamLocalityProvider { locality_columns: [0, 1, 2, 3, 4] }
               └─StreamExchange [no_shuffle] { dist: HashShard(orders.o_orderkey) }
                 └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey }
                   ├─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │ └─StreamLocalityProvider { locality_columns: [2] }
                   │   └─StreamExchange [no_shuffle] { dist: HashShard(orders.o_orderkey) }
                   │     └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey }
                   │       ├─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │       │ └─StreamLocalityProvider { locality_columns: [2] }
                   │       │   └─StreamExchange { dist: HashShard(orders.o_orderkey) }
                   │       │     └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey }
                   │       │       ├─StreamExchange { dist: HashShard(customer.c_custkey) }
                   │       │       │ └─StreamLocalityProvider { locality_columns: [0] }
                   │       │       │   └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(customer.c_custkey) }
                   │       │       │     └─StreamTableScan { table: customer, columns: [c_custkey, c_name] }
                   │       │       └─StreamExchange { dist: HashShard(orders.o_custkey) }
                   │       │         └─StreamLocalityProvider { locality_columns: [1] }
                   │       │           └─StreamExchange { dist: HashShard(orders.o_custkey) }
                   │       │             └─StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
                   │       └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                   │         └─StreamLocalityProvider { locality_columns: [0] }
                   │           └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                   │             └─StreamTableScan { table: lineitem, columns: [l_orderkey, l_quantity, l_linenumber] }
                   └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                     └─StreamProject { exprs: [lineitem.l_orderkey] }
                       └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Decimal) }
                         └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] }
                           └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] }
                             └─StreamLocalityProvider { locality_columns: [0] }
                               └─StreamExchange { dist: HashShard(lineitem.l_orderkey) }
                                 └─StreamTableScan { table: lineitem, columns: [l_orderkey, l_quantity, l_linenumber] }
(38 rows)

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces locality enforcement and locality backfill to RisingWave's streaming engine, extending the existing index selection functionality to improve performance during large historical data backfilling in memory-limited scenarios.

Key changes include:

  • Implementation of LocalityProvider operators that buffer data with locality column ordering during backfill
  • Extension of dependency ordering to ensure proper sequencing between scan backfill and locality backfill operations
  • Addition of session configuration to enable/disable locality backfill functionality

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/stream/src/executor/locality_provider.rs Core executor implementation for locality-aware backfilling with proper state management
src/stream/src/from_proto/locality_provider.rs Protocol buffer conversion and executor builder for LocalityProvider
src/frontend/src/optimizer/plan_node/stream_locality_provider.rs Stream plan node implementation with state and progress table catalog building
src/frontend/src/optimizer/plan_node/logical_locality_provider.rs Logical plan node with column pruning and predicate pushdown support
src/meta/src/stream/stream_graph/fragment.rs Fragment dependency analysis for LocalityProvider ordering
src/meta/src/model/stream.rs Backfill upstream type classification for LocalityProvider
src/common/src/session_config/mod.rs Session configuration parameter for enabling locality backfill
proto/stream_plan.proto Protocol buffer definition for LocalityProviderNode
Comments suppressed due to low confidence (1)

src/stream/src/executor/locality_provider.rs:1

  • The magic number 1024 should be defined as a named constant or made configurable to improve maintainability and allow tuning.
// Copyright 2025 RisingWave Labs

}
}

// TODO: truncate the state table after backfill.
Copy link

Copilot AI Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO comment indicates incomplete functionality. State table truncation after backfill should be implemented or tracked in a proper issue management system.

Suggested change
// TODO: truncate the state table after backfill.
// Truncate the state table after backfill to free resources.
state_table.truncate().await?;

Copilot uses AI. Check for mistakes.

@yuhao-su yuhao-su self-requested a review September 25, 2025 07:15
@chenzl25 chenzl25 requested review from kwannoel and wenym1 September 29, 2025 09:32
@chenzl25
Copy link
Contributor Author

chenzl25 commented Sep 29, 2025

This PR is ready to review.
Basically the PR contains 3 parts:

  • optimizer: Introduce LocalityProvider and use it together with try_better_locality method.
  • backfill ordering: Deal with the dependencies among scan and locality backfill fragments.
  • executor: Buffer chunk until StartFragmentBackfill and backfill the buffered state table.

@chenzl25 chenzl25 mentioned this pull request Sep 29, 2025
7 tasks
@kwannoel
Copy link
Contributor

This PR is ready to review.
Basically the PR contains 3 parts:

  • optimizer: Introduce LocalityProvider and use it together with try_better_locality method.
  • backfill ordering: Deal with the dependencies among scan and locality backfill fragments.
  • executor: Buffer chunk until StartFragmentBackfill and backfill the buffered state table.

Very cool, I'll take a look tomorrow!

@kwannoel
Copy link
Contributor

This pull request introduces a new "locality backfill" feature for streaming queries, which enables more efficient backfilling by grouping and buffering input data using specified locality columns. The changes span the SQL test suite, session configuration, planner, protobuf definitions, and catalog metadata to support this feature end-to-end. The most important changes are grouped below:

Locality Backfill Feature Implementation

  • Added the LocalityProvider plan node (src/frontend/src/optimizer/plan_node/generic/locality_provider.rs) and integrated it into the planner logic, enabling operators to buffer input data by locality columns during backfill. This includes changes to the planner's aggregation and join logic to use the new node when the feature is enabled. [1] [2] [3] [4] [5] [6]
  • Added the LocalityProviderNode protobuf message and registered it as a fragment type, allowing the streaming engine to recognize and process locality backfill nodes. [1] [2] [3] [4]

Configuration and Test Coverage

  • Introduced the enable_locality_backfill session config parameter (default: false), making the feature opt-in and allowing users to control its activation. [1] [2]
  • Added new SQL logic tests and planner test cases to verify locality backfill behavior, including a dedicated end-to-end test and planner output validation. [1] [2] [3]
  • Updated the CI test script to include the locality backfill test in the automated pipeline. [1] [2]

Catalog and Utility Updates

  • Registered the new fragment type in the catalog and updated related utilities and test cases to support the LocalityProvider node. [1] [2] [3]

These changes collectively enable more granular and efficient backfill operations in streaming queries, controlled by a session-level configuration and fully integrated into the query planner and execution engine.

@BugenZhao
Copy link
Member

This PR is ready to review.

I'm wondering if it's possible to split this PR into smaller ones to make the review process easier?

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me! Reviewed all files except stream.

Comment on lines +1780 to +1786
Some(
LogicalLocalityProvider::new(
self.clone_with_left_right(self.left(), self.right()).into(),
columns.to_owned(),
)
.into(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that we generate a LocalityProvider only until we encounter an upstream with different locality. If I'm understanding correctly, we will generate a plan like

Join -> LocalityProvider -> Filter -> Project -> Agg

instead of

Join -> Filter -> Project -> LocalityProvider -> Agg

Does it mean that we need to buffer unnecessary data that could have been filtered out by those stateless executors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good insight. To resolve this issue, I think we should add the LocalityProvider to the requirement side instead of the provider side.

Comment on lines +729 to +732
if let Some(better_plan) = self.try_better_locality_inner(columns) {
Some(better_plan)
} else if self.ctx().session_ctx().config().enable_locality_backfill() {
Some(LogicalLocalityProvider::new(self.clone().into(), columns.to_owned()).into())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern seems to appear multiple times. Is there any way to extract it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I will refactor this part together with this optimization in the following PR.

Comment on lines +47 to +48
// If the input is hash-distributed, we make it a UpstreamHashShard distribution
// just like a normal table scan. It is used to ensure locality provider is in its own fragment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that this is actually a workaround for the backfill order control to work correctly? I guess we can keep the HashShard and omit one shuffle if it's solely for correctness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only for backfill order control but also for backfilling itself. Considering the downstream is a HashJoin and the both inputs distribution are HashShard and satisfy the join key distribution requirement. Then in the same fragment, we have 2 backfilling node, and it is coupled and not scalable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then in the same fragment, we have 2 backfilling node, and it is coupled and not scalable.

This is true, because we want different tables to be scaled independently regardless of whether there's a downstream job joining them together.

However, for locality backfill nodes, I think it's okay to couple the parallelism and distribution here, as we don't support scaling a specific fragment now (cc @shanicky). If having multiple backfill nodes in a single fragment/actor could be a concern of performance, we can also split it using no-shuffle exchanges, which also maintain the existing assumption that there should be at most one backfill node per fragment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems to introduce unnecessary Exchange nodes, which may harm performance even after the backfilling stage is finished... 🤔


// Set locality columns as primary key.
for locality_col_idx in self.locality_columns() {
catalog_builder.add_order_column(*locality_col_idx, OrderType::ascending());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realize that ideally we should also have Order as a requirement for "better locality".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it seems the order doesn't matter, any order can provide the locality we want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some brainstorm: if the downstream is a MAX aggregation, then descending order may provide better locality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the ordering here for MAX aggregation is an optimization of CPU (like we have a shortcut here do not need to update the TopN cache) instead of IO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use Order as a requirement for "better locality", we need to enumerate all the possibility combination in the requirement side which will cause the algorithm complexity grows up fast.

Comment on lines +162 to +164
/// Schema: | vnode | pk(locality columns + input stream keys) | `backfill_finished` | `row_count` |
/// Key: | vnode | pk(locality columns + input stream keys) |
fn build_progress_catalog(&self, state: &mut BuildFragmentGraphState) -> TableCatalog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify this with other backfill nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table scan backfill progress table schema is not always the same as the locality backfill, when the StreamScanType is SnapshotBackfill, so we can't unify it.

/// Build catalog for backfill state
///
/// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
///
/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
///
/// key: | vnode |
/// value: | pk ... | `backfill_finished` | `row_count` |
///
/// When we update the backfill progress,
/// we update it for all vnodes.
///
/// `pk` refers to the upstream pk which we use to track the backfill progress.
///
/// `vnode` is the corresponding vnode of the upstream's distribution key.
/// It should also match the vnode of the backfill executor.
///
/// `backfill_finished` is a boolean which just indicates if backfill is done.
///
/// `row_count` is a count of rows which indicates the # of rows per executor.
/// We used to track this in memory.
/// But for backfill persistence we have to also persist it.
///
/// FIXME(kwannoel):
/// - Across all vnodes, the values are the same.
/// - e.g.
/// | vnode | pk ... | `backfill_finished` | `row_count` |
/// | 1002 | Int64(1) | t | 10 |
/// | 1003 | Int64(1) | t | 10 |
/// | 1003 | Int64(1) | t | 10 |
///
/// Eventually we should track progress per vnode, to support scaling with both mview and
/// the corresponding `no_shuffle_backfill`.
/// However this is not high priority, since we are working on supporting arrangement backfill,
/// which already has this capability.
///
///
/// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
///
/// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
///
/// key: | vnode |
/// value: | `epoch` | `row_count` | `is_epoch_finished` | pk ...
pub fn build_backfill_state_catalog(
&self,
state: &mut BuildFragmentGraphState,
stream_scan_type: StreamScanType,
) -> TableCatalog {

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executor part looks good to me.

@chenzl25
Copy link
Contributor Author

chenzl25 commented Oct 9, 2025

I'm wondering if it's possible to split this PR into smaller ones to make the review process easier?

I think to split the PR, I need to guarantee after merging everything is still working as expected, considering right now the code is not that large and it is a minimal PR to demonstrate the functionality, I will keep it as is. Any further optimization would be listed in the tracking issue.

@chenzl25 chenzl25 added this pull request to the merge queue Oct 10, 2025
Merged via the queue into main with commit e036611 Oct 10, 2025
42 of 43 checks passed
@chenzl25 chenzl25 deleted the dylan/support_locality_enforcement branch October 10, 2025 05:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants